package cn._51doit.day05;


import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
 * @create: 2021-10-20 11:18
 * @author: 今晚打脑斧
 * @program: CountWindowAllDemo
 * @Description:
 *
 *   先keyBy，再按照ProcessingTime划分滑动窗口
 *
 **/
public class ProcessingTimeSlidingWindowDemo {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port",8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        //spark,3
        DataStreamSource<String> lines = env.socketTextStream("doit01", 8888);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(line -> {
            String[] fields = line.split(",");
            String word = fields[0];
            int count = Integer.parseInt(fields[1]);
            return Tuple2.of(word, count);
        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        //先调用keyBy
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0);

        /**
         * 10秒钟进行一次数据的处理,滑动"距离"为5秒钟
         * 触发之后,所有的分区内的所有的组内,有数据的都会进行处理
         */
        //按照ProcessingTime划分滚动窗口
        //WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.timeWindow(Time.seconds(10), Time.seconds(5));
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));

        //调用window function
        SingleOutputStreamOperator<Tuple2<String, Integer>> res = windowedStream.sum(1);

        res.print();

        env.execute();

    }
}
